package com.qsl.datax.plugin.reader.kafkareader;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.datax.common.element.BoolColumn;
import com.alibaba.datax.common.element.BytesColumn;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.DoubleColumn;
import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;

/**
 * @author 青石路
 */
public class KafkaReader extends Reader {

    public static class Job extends Reader.Job {

        private Configuration originalConfig = null;

        @Override
        public void init() {
            this.originalConfig = super.getPluginJobConf();
            this.validateParameter();
        }

        @Override
        public void destroy() {

        }

        @Override
        public List<Configuration> split(int adviceNumber) {
            List<Configuration> configurations = new ArrayList<>(adviceNumber);
            for (int i=0; i<adviceNumber; i++) {
                configurations.add(this.originalConfig.clone());
            }
            return configurations;
        }

        private void validateParameter() {
            this.originalConfig.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaReaderErrorCode.REQUIRED_VALUE);
            this.originalConfig.getNecessaryValue(Key.TOPIC, KafkaReaderErrorCode.REQUIRED_VALUE);
        }
    }

    public static class Task extends Reader.Task {

        private static final Logger logger = LoggerFactory.getLogger(Task.class);

        private Consumer<String, String> consumer;
        private String topic;
        private Configuration conf;
        private int maxPollRecords;
        private String fieldDelimiter;
        private String readType;
        private List<Column.Type> columnTypes;

        @Override
        public void destroy() {
            logger.info("consumer close");
            if (Objects.nonNull(consumer)) {
                consumer.close();
            }
        }

        @Override
        public void init() {
            this.conf = super.getPluginJobConf();
            this.topic = conf.getString(Key.TOPIC);
            this.maxPollRecords = conf.getInt(Key.MAX_POLL_RECORDS, 500);
            fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
            readType = conf.getUnnecessaryValue(Key.READ_TYPE, ReadType.JSON.name(), null);
            if (!ReadType.JSON.name().equalsIgnoreCase(readType)
                    && !ReadType.TEXT.name().equalsIgnoreCase(readType)) {
                throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,
                        String.format("您提供配置文件有误，不支持的readType[%s]", readType));
            }
            if (ReadType.JSON.name().equalsIgnoreCase(readType)) {
                List<String> columnTypeList = conf.getList(Key.COLUMN_TYPE, String.class);
                if (CollUtil.isEmpty(columnTypeList)) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,
                            String.format("您提供配置文件有误，readType是JSON时[%s]是必填参数，不允许为空或者留白 .", Key.COLUMN_TYPE));
                }
                convertColumnType(columnTypeList);
            }
            Properties props = new Properties();
            props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS));
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));
            props.put(ConsumerConfig.GROUP_ID_CONFIG, conf.getNecessaryValue(Key.GROUP_ID, KafkaReaderErrorCode.REQUIRED_VALUE));
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
            Configuration saslConf = conf.getConfiguration(Key.SASL);
            if (ObjUtil.isNotNull(saslConf)) {
                logger.info("配置启用了SASL认证");
                props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaReaderErrorCode.REQUIRED_VALUE));
                props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaReaderErrorCode.REQUIRED_VALUE));
                String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaReaderErrorCode.REQUIRED_VALUE);
                String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaReaderErrorCode.REQUIRED_VALUE);
                props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password));
            }
            consumer = new KafkaConsumer<>(props);
        }

        @Override
        public void startRead(RecordSender recordSender) {
            consumer.subscribe(CollUtil.newArrayList(topic));
            int pollTimeoutMs = conf.getInt(Key.POLL_TIMEOUT_MS, 1000);
            int retries = conf.getInt(Key.RETRIES, 5);
            if (retries < 0) {
                logger.info("joinGroupSuccessRetries 配置有误[{}], 重置成默认值[5]", retries);
                retries = 5;
            }
            /**
             * consumer 每次都是新创建，第一次poll时会重新加入消费者组，加入过程会进行Rebalance，而 Rebalance 会导致同一 Group 内的所有消费者都不能工作
             * 所以 poll 拉取的过程中，即使topic中有数据也不一定能拉到，因为 consumer 正在加入消费者组中
             * kafka-clients 没有对应的API、事件机制来知道 consumer 成功加入消费者组的确切时间
             * 故增加重试
             */
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
            int i = 0;
            if (CollUtil.isEmpty(records)) {
                for (; i < retries; i++) {
                    records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
                    logger.info("第 {} 次重试，获取消息记录数[{}]", i + 1, records.count());
                    if (!CollUtil.isEmpty(records)) {
                        break;
                    }
                }
            }
            if (i >= retries) {
                logger.info("重试 {} 次后，仍未获取到消息，请确认是否有数据、配置是否正确", retries);
                return;
            }
            transferRecord(recordSender, records);
            do {
                records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
                transferRecord(recordSender, records);
            } while (!CollUtil.isEmpty(records) && records.count() >= maxPollRecords);
        }

        private void transferRecord(RecordSender recordSender, ConsumerRecords<String, String> records) {
            if (CollUtil.isEmpty(records)) {
                return;
            }
            for (ConsumerRecord<String, String> record : records) {
                Record sendRecord = recordSender.createRecord();
                String msgValue = record.value();
                if (ReadType.JSON.name().equalsIgnoreCase(readType)) {
                    transportJsonToRecord(sendRecord, msgValue);
                } else if (ReadType.TEXT.name().equalsIgnoreCase(readType)) {
                    // readType = text，全当字符串类型处理
                    String[] columnValues = msgValue.split(fieldDelimiter);
                    for (String columnValue : columnValues) {
                        sendRecord.addColumn(new StringColumn(columnValue));
                    }
                }
                recordSender.sendToWriter(sendRecord);
            }
            consumer.commitAsync();
        }

        private void convertColumnType(List<String> columnTypeList) {
            columnTypes = new ArrayList<>();
            for (String columnType : columnTypeList) {
                switch (columnType.toUpperCase()) {
                    case "STRING":
                        columnTypes.add(Column.Type.STRING);
                        break;
                    case "LONG":
                        columnTypes.add(Column.Type.LONG);
                        break;
                    case "DOUBLE":
                        columnTypes.add(Column.Type.DOUBLE);
                    case "DATE":
                        columnTypes.add(Column.Type.DATE);
                        break;
                    case "BOOLEAN":
                        columnTypes.add(Column.Type.BOOL);
                        break;
                    case "BYTES":
                        columnTypes.add(Column.Type.BYTES);
                        break;
                    default:
                        throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
                                String.format("您提供的配置文件有误，datax不支持数据类型[%s]", columnType));
                }
            }
        }

        private void transportJsonToRecord(Record sendRecord, String msgValue) {
            List<KafkaColumn> kafkaColumns = JSONUtil.toList(msgValue, KafkaColumn.class);
            if (columnTypes.size() != kafkaColumns.size()) {
                throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
                        String.format("您提供的配置文件有误，readType是JSON时[%s列数=%d]与[json列数=%d]的数量不匹配", Key.COLUMN_TYPE, columnTypes.size(), kafkaColumns.size()));
            }
            for (int i=0; i<columnTypes.size(); i++) {
                KafkaColumn kafkaColumn = kafkaColumns.get(i);
                switch (columnTypes.get(i)) {
                    case STRING:
                        sendRecord.setColumn(i, new StringColumn(kafkaColumn.getColumnValue()));
                        break;
                    case LONG:
                        sendRecord.setColumn(i, new LongColumn(kafkaColumn.getColumnValue()));
                        break;
                    case DOUBLE:
                        sendRecord.setColumn(i, new DoubleColumn(kafkaColumn.getColumnValue()));
                        break;
                    case DATE:
                        // 暂只支持时间戳
                        sendRecord.setColumn(i, new DateColumn(Long.parseLong(kafkaColumn.getColumnValue())));
                        break;
                    case BOOL:
                        sendRecord.setColumn(i, new BoolColumn(kafkaColumn.getColumnValue()));
                        break;
                    case BYTES:
                        sendRecord.setColumn(i, new BytesColumn(kafkaColumn.getColumnValue().getBytes(StandardCharsets.UTF_8)));
                        break;
                    default:
                        throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
                                String.format("您提供的配置文件有误，datax不支持数据类型[%s]", columnTypes.get(i)));
                }
            }
        }
    }
}
